package com.danan.data_collector.initializer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.danan.data_collector.config.EnvironmentConfig;
import com.danan.data_collector.config.KafkaConfig;
import com.danan.data_collector.config.SourceConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.TopicSelector;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;
import java.util.UUID;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/05/13/11:03
 * @Description:
 */
@Configuration
public class EnvironmentInitializer {

    @Bean
    public KafkaSink<String> kafkaSink(@Autowired KafkaConfig kafkaConfig, @Autowired SourceConfig sourceConfig) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 24 * 60 * 60 * 1000 + "");
//        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
//        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        return KafkaSink.<String>builder()
                .setBootstrapServers(kafkaConfig.getBootstrapServer())
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                                .setTopic(kafkaConfig.getOdsTopicName())
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                .setKafkaProducerConfig(properties)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix(UUID.randomUUID().toString())
                .build();
    }

    /**
     * @Description 获取编程环境
     * @Param []
     **/
    @Bean
    public StreamExecutionEnvironment env(@Autowired EnvironmentConfig environmentConfig) {
        org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
        conf.setInteger("rest.port", 3001);//设置web端口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        Integer parallelism = environmentConfig.getParallelism();
        if (parallelism != null) {
            env.setParallelism(parallelism);
        }
        // 2 设置状态后端和Checkpoint
//        env.setStateBackend(new EmbeddedRocksDBStateBackend());//启用RocksDB状态后端
        env.enableCheckpointing(60 * 1000);//设置checkpoint的周期
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//设置同时checkpoint的数量
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);//checkpoint的超时时间
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointStorage(environmentConfig.getCheckpointStorage1());//设置checkpoint的存储路径
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3600);
        // 故障重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 30));

        return env;
    }

}
